package com.aydx.minirpc.core.server;

import com.aydx.minirpc.core.MiniRpcConfig;
import com.aydx.minirpc.core.MiniRpcContext;
import com.aydx.minirpc.core.RemoteCallPackage;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;

import static io.netty.util.ReferenceCountUtil.release;

/**
 * Created by aydx on 2019/1/6.
 */
@ChannelHandler.Sharable
public class NettyMinirpcServer extends ChannelInboundHandlerAdapter {
    private static final Logger logger = LoggerFactory.getLogger(NettyMinirpcServer.class);
    private EventLoopGroup bossGroup;
    private EventLoopGroup workerGroup;
    private ServerBootstrap b; // (2)
    private MiniRpcContext rpcContext;

    public NettyMinirpcServer(MiniRpcContext rpcContext) {
        this.rpcContext=rpcContext;
    }

    public void startServer() {
        bossGroup = new NioEventLoopGroup(); // (1)
        workerGroup = new NioEventLoopGroup();
        try {
            b = new ServerBootstrap(); // (2)
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class) // (3)
                    .childHandler(new ChannelInitializer<SocketChannel>() { // (4)

                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024*1024*1024, Unpooled.copiedBuffer("{mrpc-end}".getBytes())));
                            ch.pipeline().addLast(NettyMinirpcServer.this);
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 128)          // (5)
                    .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
            logger.info("NettyServer.startServer Start ,port:"+rpcContext.getMiniRpcConfig().getSelfServicePort());
            b.bind(rpcContext.getMiniRpcConfig().getSelfServicePort()).sync(); // (7)
            logger.info("NettyServer.startServer Start End ,port:"+rpcContext.getMiniRpcConfig().getSelfServicePort());
        }catch (Exception e){
            logger.error("NettyServer.startServer Start Error ,port:"+rpcContext.getMiniRpcConfig().getSelfServicePort(),e);
        }
    }
    @Override
    public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
        try {
            ByteBuf buf = (ByteBuf) msg;
            if (buf.readableBytes() <= 0) {
                return;
            }
            byte[] bs = new byte[buf.readableBytes()];
            buf.readBytes(bs);
            InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
            String remoteIp = insocket.getAddress().getHostAddress();
            Integer remotePort=insocket.getPort();
            String msgString=new String(bs);
            logger.info("收到消息:"+msgString);
            //
            RemoteCallPackage remoteCallPackage= rpcContext.getSerializableFactory().derialize(bs,RemoteCallPackage.class);
            byte[] resultJson=this.rpcContext.getAnswerRemoteCall().execute(remoteCallPackage);
            ByteBuf time = ctx.alloc().buffer(resultJson.length); // (2)
            time.writeBytes(resultJson);
            ctx.writeAndFlush(time);
        } catch (Exception e){
            logger.error("回复客户端时发生异常",e);
        }finally {
            release(msg);
        }
    }

}
